"""Support functions for working with wheel files. """ from __future__ import absolute_import import logging from email.parser import Parser from zipfile import ZipFile from pip._vendor.packaging.utils import canonicalize_name from pip._vendor.pkg_resources import DistInfoDistribution from pip._vendor.six import PY2, ensure_str from pip._internal.exceptions import UnsupportedWheel from pip._internal.utils.pkg_resources import DictMetadata from pip._internal.utils.typing import MYPY_CHECK_RUNNING if MYPY_CHECK_RUNNING: from email.message import Message from typing import Dict, Tuple from pip._vendor.pkg_resources import Distribution if PY2: from zipfile import BadZipfile as BadZipFile else: from zipfile import BadZipFile VERSION_COMPATIBLE = (1, 0) logger = logging.getLogger(__name__) class WheelMetadata(DictMetadata): """Metadata provider that maps metadata decoding exceptions to our internal exception type. """ def __init__(self, metadata, wheel_name): # type: (Dict[str, bytes], str) -> None super(WheelMetadata, self).__init__(metadata) self._wheel_name = wheel_name def get_metadata(self, name): # type: (str) -> str try: return super(WheelMetadata, self).get_metadata(name) except UnicodeDecodeError as e: # Augment the default error with the origin of the file. raise UnsupportedWheel( "Error decoding metadata for {}: {}".format( self._wheel_name, e ) ) def pkg_resources_distribution_for_wheel(wheel_zip, name, location): # type: (ZipFile, str, str) -> Distribution """Get a pkg_resources distribution given a wheel. :raises UnsupportedWheel: on any errors """ info_dir, _ = parse_wheel(wheel_zip, name) metadata_files = [ p for p in wheel_zip.namelist() if p.startswith("{}/".format(info_dir)) ] metadata_text = {} # type: Dict[str, bytes] for path in metadata_files: # If a flag is set, namelist entries may be unicode in Python 2. # We coerce them to native str type to match the types used in the rest # of the code. This cannot fail because unicode can always be encoded # with UTF-8. full_path = ensure_str(path) _, metadata_name = full_path.split("/", 1) try: metadata_text[metadata_name] = read_wheel_metadata_file( wheel_zip, full_path ) except UnsupportedWheel as e: raise UnsupportedWheel( "{} has an invalid wheel, {}".format(name, str(e)) ) metadata = WheelMetadata(metadata_text, location) return DistInfoDistribution( location=location, metadata=metadata, project_name=name ) def parse_wheel(wheel_zip, name): # type: (ZipFile, str) -> Tuple[str, Message] """Extract information from the provided wheel, ensuring it meets basic standards. Returns the name of the .dist-info directory and the parsed WHEEL metadata. """ try: info_dir = wheel_dist_info_dir(wheel_zip, name) metadata = wheel_metadata(wheel_zip, info_dir) version = wheel_version(metadata) except UnsupportedWheel as e: raise UnsupportedWheel( "{} has an invalid wheel, {}".format(name, str(e)) ) check_compatibility(version, name) return info_dir, metadata def wheel_dist_info_dir(source, name): # type: (ZipFile, str) -> str """Returns the name of the contained .dist-info directory. Raises AssertionError or UnsupportedWheel if not found, >1 found, or it doesn't match the provided name. """ # Zip file path separators must be / subdirs = list(set(p.split("/")[0] for p in source.namelist())) info_dirs = [s for s in subdirs if s.endswith('.dist-info')] if not info_dirs: raise UnsupportedWheel(".dist-info directory not found") if len(info_dirs) > 1: raise UnsupportedWheel( "multiple .dist-info directories found: {}".format( ", ".join(info_dirs) ) ) info_dir = info_dirs[0] info_dir_name = canonicalize_name(info_dir) canonical_name = canonicalize_name(name) if not info_dir_name.startswith(canonical_name): raise UnsupportedWheel( ".dist-info directory {!r} does not start with {!r}".format( info_dir, canonical_name ) ) # Zip file paths can be unicode or str depending on the zip entry flags, # so normalize it. return ensure_str(info_dir) def read_wheel_metadata_file(source, path): # type: (ZipFile, str) -> bytes try: return source.read(path) # BadZipFile for general corruption, KeyError for missing entry, # and RuntimeError for password-protected files except (BadZipFile, KeyError, RuntimeError) as e: raise UnsupportedWheel( "could not read {!r} file: {!r}".format(path, e) ) def wheel_metadata(source, dist_info_dir): # type: (ZipFile, str) -> Message """Return the WHEEL metadata of an extracted wheel, if possible. Otherwise, raise UnsupportedWheel. """ path = "{}/WHEEL".format(dist_info_dir) # Zip file path separators must be / wheel_contents = read_wheel_metadata_file(source, path) try: wheel_text = ensure_str(wheel_contents) except UnicodeDecodeError as e: raise UnsupportedWheel("error decoding {!r}: {!r}".format(path, e)) # FeedParser (used by Parser) does not raise any exceptions. The returned # message may have .defects populated, but for backwards-compatibility we # currently ignore them. return Parser().parsestr(wheel_text) def wheel_version(wheel_data): # type: (Message) -> Tuple[int, ...] """Given WHEEL metadata, return the parsed Wheel-Version. Otherwise, raise UnsupportedWheel. """ version_text = wheel_data["Wheel-Version"] if version_text is None: raise UnsupportedWheel("WHEEL is missing Wheel-Version") version = version_text.strip() try: return tuple(map(int, version.split('.'))) except ValueError: raise UnsupportedWheel("invalid Wheel-Version: {!r}".format(version)) def check_compatibility(version, name): # type: (Tuple[int, ...], str) -> None """Raises errors or warns if called with an incompatible Wheel-Version. pip should refuse to install a Wheel-Version that's a major series ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when installing a version only minor version ahead (e.g 1.2 > 1.1). version: a 2-tuple representing a Wheel-Version (Major, Minor) name: name of wheel or package to raise exception about :raises UnsupportedWheel: when an incompatible Wheel-Version is given """ if version[0] > VERSION_COMPATIBLE[0]: raise UnsupportedWheel( "{}'s Wheel-Version ({}) is not compatible with this version " "of pip".format(name, '.'.join(map(str, version))) ) elif version > VERSION_COMPATIBLE: logger.warning( 'Installing from a newer Wheel-Version (%s)', '.'.join(map(str, version)), )